﻿#include "zookeeper_client.hh"
#include "../../util/string_util.hh"
#include "zookeeper.h"
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>

#define USE_TEMP_ZNODE

namespace kratos {
namespace service {

constexpr static int BUFFER_LENGTH = 1024 * 16; ///< 值的最大长度

ZookeeperClient::ZookeeperClient() {
}

ZookeeperClient::~ZookeeperClient() {
  disconnect();
}

auto ZookeeperClient::get_changed_path(std::vector<std::string>& path) ->void {
  std::lock_guard<std::mutex> lock(child_cache_mutex_);
  for (const auto& k : changed_keys_) {
    path.emplace_back(k);
  }
  changed_keys_.clear();
}

auto ZookeeperClient::is_changed()->bool {
  if (!dirty_flag_) {
    return false;
  }
  dirty_flag_ = false;
  return true;
}

auto ZookeeperClient::reconnect() -> bool {
  if (is_connected()) {
    return false;
  }
  return connect(host_, timeout_);
}

auto ZookeeperClient::dump(std::ostream &os) -> void {
  {
    std::lock_guard<std::mutex> lock(key_cache_mutex_);
    for (const auto &it : key_cache_) {
      os << it.first << "," << it.second << std::endl;
    }
  }
  {
    std::lock_guard<std::mutex> lock(child_cache_mutex_);
    for (const auto &it : child_cache_) {
      os << it.first << ":" << std::endl;
      for (const auto &child : it.second) {
        os << "  " << child << std::endl;
      }
    }
  }
}

auto ZookeeperClient::connect(const std::string &host, int timeout) -> bool {
  if (is_connected()) {
    return true;
  }
  if (!check_host(host)) {
    return false;
  }
  host_ = host;
  timeout_ = timeout;
  // 关闭zookeeper客户端日志
  zoo_set_debug_level((ZooLogLevel)0);
  zhandle_ = zookeeper_init(host.c_str(), init_watch_fn, timeout, nullptr,
                            reinterpret_cast<void *>(this), 0);
  if (zhandle_ == nullptr) {
    return false;
  }
  int count = 0;
  while (!is_connected_.load()) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    count += 1;
    if (timeout < count * 10) {
      break;
    }
  }
  return is_connected_.load();
}

auto ZookeeperClient::disconnect() -> bool {
  if (zhandle_ && is_connected()) {
    for (const auto &key : my_keys_) {
      zoo_delete(zhandle_, key.c_str(), -1);
    }
  }
  is_connected_.store(false);
  if (zhandle_) {
    zookeeper_close(zhandle_);
  }
  zhandle_ = nullptr;
  key_cache_.clear();
  child_cache_.clear();
  my_keys_.clear();
  return true;
}

auto ZookeeperClient::get(const std::string &key, std::string &value) -> bool {
  static std::unique_ptr<char> buffer_ {new char[BUFFER_LENGTH]}; ///< 缓冲区
  auto it = key_cache_.find(key);
  if (it != key_cache_.end()) {
    value = it->second;
    return true;
  }
  if (!is_connected()) {
    return false;
  }
  auto retval = zoo_wexists(zhandle_, key.c_str(), wexists_watch_fn,
                            reinterpret_cast<void *>(this), nullptr);
  if (ZOK != retval) {
    return false;
  }
  int length = BUFFER_LENGTH - 1;
  memset(buffer_.get(), 0, BUFFER_LENGTH);
  retval =
      zoo_wget(zhandle_, key.c_str(), wget_watch_fn,
               reinterpret_cast<void *>(this), buffer_.get(), &length, nullptr);
  if (ZOK != retval) {
    return false;
  }
  std::lock_guard<std::mutex> lock(key_cache_mutex_);
  value.assign(buffer_.get(), length);
  key_cache_.insert(std::make_pair(key, value));
  return true;
}

auto ZookeeperClient::get_children(const std::string &key,
                                   std::list<std::string> &children) -> bool {
  if (!is_connected()) {
    return false;
  }
  auto it = child_cache_.find(key);
  if (it != child_cache_.end()) {
    children = it->second;
    return true;
  }
  struct String_vector str_vec;
  auto retval = zoo_wget_children(zhandle_, key.c_str(), wget_children_watch_fn,
                                  reinterpret_cast<void *>(this), &str_vec);
  if (ZOK != retval) {
    return false;
  }
  // 更新子节点
  for (int i = 0; i < str_vec.count; ++i) {
    children.push_back(str_vec.data[i]);
  }
  std::lock_guard<std::mutex> lock(child_cache_mutex_);
  child_cache_.insert(std::make_pair(key, children));
  return true;
}

auto ZookeeperClient::get_children(const std::string &key,
                                   std::vector<std::string> &children) -> bool {
  children.clear();
  std::list<std::string> temp_list;
  if (!get_children(key, temp_list)) {
    return false;
  }
  for (const auto &i : temp_list) {
    children.push_back(i);
  }
  return true;
}

auto ZookeeperClient::get_children_count(const std::string &key)
    -> std::size_t {
  auto it = child_cache_.find(key);
  if (it != child_cache_.end()) {
    return it->second.size();
  }
  return 0;
}

auto ZookeeperClient::set(const std::string &key, const std::string &value)
    -> bool {
  if (!is_connected()) {
    return false;
  }
  auto retval = zoo_exists(zhandle_, key.c_str(), 0, nullptr);
  if (ZOK != retval) {
#ifdef USE_TEMP_ZNODE
    retval = zoo_create(zhandle_, key.c_str(), value.c_str(),
                        static_cast<int>(value.size()), &ZOO_OPEN_ACL_UNSAFE,
                        ZOO_EPHEMERAL, nullptr, 0);
#else
    retval = zoo_create(zhandle_, key.c_str(), value.c_str(),
                        static_cast<int>(value.size()), &ZOO_OPEN_ACL_UNSAFE,
                        ZOO_PERSISTENT, nullptr, 0);
#endif
    if (ZOK != retval) {
      return false;
    }
  } else {
    retval = zoo_set(zhandle_, key.c_str(), value.c_str(),
                     static_cast<int>(value.size()), -1);
    if (ZOK != retval) {
      return false;
    }
  }
  my_keys_.insert(key);
  key_cache_[key] = value;
  return true;
}

auto ZookeeperClient::set_path(const std::string &key, const std::string &value)
    -> bool {
  if (!is_connected()) {
    return false;
  }
  auto retval = zoo_exists(zhandle_, key.c_str(), 0, nullptr);
  if (ZOK != retval) {
    retval = zoo_create(zhandle_, key.c_str(), value.c_str(),
                        static_cast<int>(value.size()), &ZOO_OPEN_ACL_UNSAFE,
                        ZOO_PERSISTENT, nullptr, 0);
    if (ZOK != retval) {
      return false;
    }
    return true;
  }
  retval = zoo_set(zhandle_, key.c_str(), value.c_str(),
                   static_cast<int>(value.size()), -1);
  if (ZOK != retval) {
    return false;
  }
  return true;
}

auto ZookeeperClient::remove(const std::string &key) -> bool {
  if (!is_connected()) {
    return false;
  }
  auto retval = zoo_delete(zhandle_, key.c_str(), -1);
  if (ZOK != retval) {
    return false;
  }
  my_keys_.erase(key);
  return true;
}

auto ZookeeperClient::exists(const std::string &key) -> bool {
  if (!is_connected()) {
    return false;
  }
  auto retval = zoo_exists(zhandle_, key.c_str(), 0, nullptr);
  if (ZOK != retval) {
    return false;
  }
  return true;
}

auto ZookeeperClient::remove_cache(const std::string &key,
                                   const std::string &value) -> void {
  std::lock_guard<std::mutex> lock(child_cache_mutex_);
  auto it = child_cache_.find(key);
  if (it == child_cache_.end()) {
    return;
  }
  for (auto child_it = it->second.begin(); child_it != it->second.end();
       child_it++) {
    if (*child_it == value) {
      it->second.erase(child_it);
      return;
    }
  }
}

auto ZookeeperClient::is_connected() -> bool { return is_connected_.load(); }

void ZookeeperClient::init_watch_fn(zhandle_t *zh, int type, int state,
                                    const char *path, void *context) {
  auto *client = reinterpret_cast<ZookeeperClient *>(context);
  if (type == ZOO_SESSION_EVENT) {
    if (state == ZOO_CONNECTED_STATE) {
      // 连接成功
      client->is_connected_.store(true);
    } else if (state == ZOO_EXPIRED_SESSION_STATE) {
      // 链接断开
      client->disconnect();
    } else if (state == ZOO_NOTCONNECTED_STATE) {
      // 链接断开
      client->disconnect();
    } else if (state == ZOO_ASSOCIATING_STATE) {
      return;
    } else if (state == ZOO_CONNECTING_STATE) {
      return;
    } else if (state == ZOO_AUTH_FAILED_STATE) {
      return;
    } else if (state == ZOO_READONLY_STATE) {
      return;
    } else if (state == ZOO_NOTCONNECTED_STATE) {
      return;
    }
  }
}

void ZookeeperClient::wexists_watch_fn(zhandle_t *zh, int type, int state,
                                       const char *path, void *context) {
  static std::unique_ptr<char> buffer_ {new char[BUFFER_LENGTH]}; ///< 缓冲区
  auto *client = reinterpret_cast<ZookeeperClient *>(context);
  if (type == ZOO_CHANGED_EVENT) {
    int length = BUFFER_LENGTH - 1;
    std::memset(buffer_.get(), 0, BUFFER_LENGTH);
    auto retval = zoo_wget(client->zhandle_, path, wget_watch_fn, context,
        buffer_.get(), &length, nullptr);
    if (ZOK != retval) {
      return;
    }
    // 更新缓存
    std::lock_guard<std::mutex> lock(client->key_cache_mutex_);
    client->key_cache_[path] = buffer_.get();
  } else if (type == ZOO_DELETED_EVENT) {
    // 更新缓存
    std::lock_guard<std::mutex> lock(client->key_cache_mutex_);
    client->key_cache_.erase(path);
  } else if (type == ZOO_CREATED_EVENT) {
    int length = BUFFER_LENGTH - 1;
    std::memset(buffer_.get(), 0, BUFFER_LENGTH);
    auto retval = zoo_wget(client->zhandle_, path, wget_watch_fn, context,
        buffer_.get(), &length, nullptr);
    if (ZOK != retval) {
      return;
    }
    // 更新缓存
    std::lock_guard<std::mutex> lock(client->key_cache_mutex_);
    client->key_cache_[path] = buffer_.get();
  }
}

void ZookeeperClient::wget_watch_fn(zhandle_t *zh, int type, int state,
                                    const char *path, void *context) {
  static std::unique_ptr<char> buffer_ {new char[BUFFER_LENGTH]}; ///< 缓冲区
  auto *client = reinterpret_cast<ZookeeperClient *>(context);
  if (type == ZOO_CHANGED_EVENT) {
    int length = BUFFER_LENGTH - 1;
    std::memset(buffer_.get(), 0, BUFFER_LENGTH);
    auto retval = zoo_wget(client->zhandle_, path, wget_watch_fn, context,
        buffer_.get(), &length, nullptr);
    if (ZOK != retval) {
      return;
    }
    // 更新缓存
    std::lock_guard<std::mutex> lock(client->key_cache_mutex_);
    client->key_cache_[path] = buffer_.get();
  } else if (type == ZOO_DELETED_EVENT) {
    // 更新缓存
    std::lock_guard<std::mutex> lock(client->key_cache_mutex_);
    client->key_cache_.erase(path);
  }
}

void ZookeeperClient::wget_children_watch_fn(zhandle_t *zh, int type, int state,
                                             const char *path, void *context) {
  auto *client = reinterpret_cast<ZookeeperClient *>(context);
  if (type == ZOO_CHILD_EVENT) {
    struct String_vector str_vec;
    auto retval = zoo_wget_children(client->zhandle_, path,
                                    wget_children_watch_fn, context, &str_vec);
    if (ZOK != retval) {
      return;
    }
    // 更新子节点
    std::list<std::string> children;
    for (int i = 0; i < str_vec.count; ++i) {
      children.push_back(str_vec.data[i]);
    }
    deallocate_String_vector(&str_vec);
    std::lock_guard<std::mutex> lock(client->child_cache_mutex_);
    client->child_cache_[path] = children;
    client->changed_keys_.emplace(path);
    client->dirty_flag_ = true;
  }
}

bool ZookeeperClient::check_host(const std::string &host) {
  if (host.empty()) {
    return false;
  }
  std::vector<std::string> result;
  util::split(host, ",", result);
  for (const auto &i : result) {
    std::vector<std::string> pair;
    util::split(i, ":", pair);
    if (pair.size() == 2) {
      if (!util::is_ip_address(pair[0])) {
        return false;
      }
      if (!util::isnumber(pair[1])) {
        return false;
      }
      try {
        auto n = std::stoi(pair[1]);
        if ((n < 0) || (n > 65535)) {
          return false;
        }
      } catch (...) {
        return false;
      }
    } else {
      return false;
    }
  }
  return true;
}

} // namespace service
} // namespace kratos
